"""Calculation datasets.
This module deals with the handling of series of calculations.
Classes and functions of this module are meant to simplify the approach to
ensemble calculations with the code, and to deal with parallel executions of
multiple instances of the code.
"""
import logging
from BigDFT.LogUtils import format_iterable
from BigDFT.Calculators import Runner
# create and link logger
module_logger = logging.getLogger(__name__)
def _is_WSL():
"""Determine if we are running on WSL
':' is an illegal character on windows filesystems,
determine if the current run is on a Windows Subsystem for Linux and alter
the file transfer accordingly
Returns:
bool: Whether we are running in a WSL environment or not
"""
import os
osinfo = os.uname().release
WSL = 'WSL' in osinfo
module_logger.debug(f'checking for WSL using os release: {osinfo}: '
f'{WSL}')
return WSL
[docs]def name_from_id(id):
"""Hash the id into a run name
Construct the name of the run from the id dictionary
Args:
id (dict): id associated to the run
Returns:
str: name of the run associated to the dictionary ``id``
"""
jchar = '__' # '' if _is_WSL() else ':'
return '-_-'.join([jchar.join([k, str(id[k])]) for k in sorted(id)])
# keys = list(id.keys())
# keys.sort()
# name = ''
# for k in keys:
# name += k + '=' + str(id[k]) + ','
# ','.join([k+'='+str(i))
# return name.rstrip(',')
[docs]def names_from_id(id):
"""
Hash the id into a list of run names to search with the
function id_in_names
and add the separator '-_-' to have the proper value of a key
(to avoid 0.3 in 0.39)
"""
if id is None:
return ['']
else:
return [name_from_id({k: v})+'-_-' for k, v in id.items()]
[docs]class Dataset(Runner):
"""A set of calculations.
Such class contains the various instances of a set of calculations with the
code.
The different calculations are labelled by parameter values and information
that might then be retrieved for inspection and plotting.
Args:
label (str): The label of the dataset. It will be needed to identify the
instance for example in plot titles or in the running directory.
run_dir (str): path of the directory where the runs will be performed.
input (dict): Inputfile to be used for the runs as default,
can be overridden by the specific inputs of the run
"""
def __init__(self, label='BigDFT dataset', run_dir='runs', **kwargs):
"""
Set the dataset ready for appending new runs
"""
self._logger = logging.getLogger(__name__ + '.Dataset')
self._logger.info('initialise a dataset with args:')
self._logger.info(f'{format_iterable(locals())}')
from copy import deepcopy
newkwargs = deepcopy(kwargs)
Runner.__init__(self, label=label, run_dir=run_dir, **newkwargs)
self.runs = []
"""
List of the runs which have to be treated by the dataset these runs
contain the input parameter to be passed to the various runners.
"""
self.calculators = []
"""
Calculators which will be used by the run method, useful to gather the
inputs in the case of a multiple run.
"""
self.results = {}
"""
Set of the results of each of the runs. The set is not ordered as the
runs may be executed asynchronously.
"""
self.ids = []
"""
List of run ids, to be used in order to classify and fetch the results
"""
self.names = []
"""
List of run names, needed for distinguishing the logfiles and
input files. Each name should be unique to correctly identify a run.
"""
self._post_processing_function = None
[docs] def append_run(self, id, runner, **kwargs):
"""Add a run into the dataset.
Append to the list of runs to be performed the corresponding runner and
the arguments which are associated to it.
Args:
id (dict): the id of the run, useful to identify the run in the
dataset. It has to be a dictionary as it may contain
different keyword. For example a run might be classified as
``id = {'hgrid':0.35, 'crmult': 5}``.
runner (Runner): the runner class to which the remaining keyword
arguments will be passed at the input.
Raises:
ValueError: if the provided id is identical to another previously
appended run.
Todo:
include id in the runs specification
"""
from copy import deepcopy
name = name_from_id(id)
if name in self.names:
raise ValueError('The run id', name,
' is already provided, modify the run id.')
self.names.append(name)
# create the input file for the run, combining run_dict and input
inp_to_append = deepcopy(self._global_options)
inp_to_append.update(deepcopy(kwargs))
# get the number of this run
irun = len(self.runs)
# append it to the runs list
self.runs.append(inp_to_append)
# append id and name
self.ids.append(id)
# search if the calculator already exists
found = False
for run in self.calculators:
calc = run["calc"]
options = {k: v for k, v in calc.global_options().items()
if k in inp_to_append}
if options == inp_to_append:
run["runs"].append(irun)
found = True
break
if not found:
self.calculators.append({'calc': runner, 'runs': [irun]})
[docs] def process_run(self):
"""
Run the dataset, by performing explicit run of each of the item of the
runs_list.
"""
self._run_the_calculations()
return {}
def _run_the_calculations(self, selection=None, extra_run_args=None):
self._logger.info('running all calculations')
from copy import deepcopy
for c in self.calculators:
calc = c['calc']
# we must here differentiate between a taskgroup run and a
# separate run
for r in c['runs']:
if selection is not None and r not in selection:
self._logger.debug(f'{r} not selected, skipping')
continue
inp = self.runs[r]
name = self.names[r]
local_inp = {k: v
for k, v in self.local_options.items()
if k in inp}
if len(local_inp) == 0:
tmp_inp = inp
else:
tmp_inp = deepcopy(inp)
tmp_inp.update(local_inp)
if extra_run_args is not None:
tmp_inp.update(extra_run_args)
self.results[r] = calc.run(name=name, **tmp_inp)
[docs] def set_postprocessing_function(self, func):
"""Set the callback of run.
Calls the function ``func`` after having performed the appended runs.
Args:
func (func): function that process the `inputs` `results` and
returns the value of the `run` method of the dataset.
The function is called as ``func(self)``.
"""
self._logger.info(f'setting postprocessing function to {func}')
self._post_processing_function = func
[docs] def post_processing(self, **kwargs):
"""
Calls the Dataset function with the results of the runs as arguments
"""
if self._post_processing_function is not None:
return self._post_processing_function(self)
else:
return self.results
[docs] def fetch_results(self, id=None, attribute=None, run_if_not_present=False):
"""Retrieve some attribute from some of the results.
Selects out of the results the objects which have in their ``id``
at least the dictionary specified as input. May return an attribute
of each result if needed.
Args:
id (dict): dictionary of the retrieved id. Return a list of the runs
that have the ``id`` argument inside the provided ``id`` in the
order provided by :py:meth:`append_run`.
If absent, then the entire list of runs is returned.
attribute (str): if present, provide the attribute of each of the
results instead of the result object
run_if_not_present (bool): If the run has not yet been performed
in the dataset then perform it.
Example:
>>> study=Dataset()
>>> study.append_run(id={'cr': 3}, input={'dft':{'rmult':[3,8]}})
>>> study.append_run(id={'cr': 4}, input={'dft':{'rmult':[4,8]}})
>>> study.append_run(id={'cr': 3, 'h': 0.5},
>>> input={'dft':{'hgrids': 0.5, 'rmult':[4,8]}})
>>> #append other runs if needed
>>> #run the calculations (optional if run_if_not_present=True)
>>> study.run()
>>> # returns a list of the energies of first and the third result
>>> # in this example
>>> data=study.fetch_results(id={'cr': 3}, attribute='energy')
"""
self._logger.info('fetching dataset results')
if id is None:
fetch_indices = list(range(len(self.names)))
if run_if_not_present:
selection_to_run = fetch_indices
else:
selection_to_run = []
self._logger.debug('no id specified, indices are:')
self._logger.debug(f'{format_iterable(fetch_indices)}')
else:
names = names_from_id(id)
fetch_indices = []
selection_to_run = []
for irun, name in enumerate(self.names):
# add the separator '-_-' to have the proper value of a key
# (to avoid 0.3 in 0.39)
if not all([(n in name+'-_-') for n in names]):
continue
if run_if_not_present and irun not in self.results:
selection_to_run.append(irun)
fetch_indices.append(irun)
self._logger.debug('specified id:')
self._logger.debug(f'{format_iterable(id)}')
self._logger.debug('indices are:')
self._logger.debug(f'{format_iterable(fetch_indices)}')
# TODO(lbeal) ensure that forcing anyfile=True here is safe
if len(selection_to_run) > 0:
self._run_the_calculations(selection=selection_to_run,
extra_run_args=dict(anyfile=True,
force=False))
self._logger.debug('run section complete, retreiving results...')
data = []
for irun in fetch_indices:
self._logger.debug(f'...for run {irun}')
r = self.results.get(irun, None)
if r is None:
self._logger.debug('results returned None, possibly due to '
'async run. Attempting to call the '
'individual runner.')
calc = self.calculators[irun]['calc']
# ensure the remote_dir and local_dir is propagated to runner
if not hasattr(calc, 'remote_directory'):
remote_dir = self._global_options.get('remote_dir', None)
calc.remote_directory = remote_dir
self._logger.debug(f'update run {irun} remote_dir '
f'option to {remote_dir}')
if not hasattr(calc, 'local_directory'):
local_dir = self._global_options.get('local_dir', None)
if local_dir is None:
local_dir = self._global_options.get('run_dir', None)
calc.local_directory = local_dir
self._logger.debug(f'update run {irun} local_dir '
f'option to {local_dir}')
resultfile = calc.remote_function._make_run('',
dry_run=True)
self._logger.debug('updating calc result file to '
f'{resultfile}')
calc.resultfile = resultfile
if hasattr(calc, 'resultfiles'):
calc.resultfiles.append(resultfile)
else:
calc.resultfiles = [resultfile]
r = calc.fetch_result()
data.append(r if attribute is None else getattr(r, attribute))
self._logger.debug(f'Done. data len: {len(data)}')
return data
[docs] def seek_convergence(self, rtol=1.e-5, atol=1.e-8, selection=None,
**kwargs):
"""
Search for the first result of the dataset which matches the provided
tolerance parameter. The results are in dataset order
(provided by the :py:meth:`append_run` method) if `selection` is not
specified.
Employs the numpy :py:meth:`allclose` method for comparison.
Args:
rtol (float): relative tolerance parameter
atol (float): absolute tolerance parameter
selection (list): list of the id of the runs in which to perform the
convergence search. Each id should be unique in the dataset.
**kwargs: arguments to be passed to the :py:meth:`fetch_results`
method.
Returns:
id,result (tuple): the id of the last run which matches the
convergence, together with the result, if convergence is
reached.
Raises:
LookupError: if the parameter for convergence were not found.
The dataset has to be enriched or the convergence parameters
loosened.
"""
from numpy import allclose
from futile.Utils import write
to_get = self.ids if selection is None else selection
id_ref = to_get[0]
write('Fetching results for id "', id_ref, '"')
ref = self.fetch_results(id=id_ref, **kwargs)
ref = ref[0]
for id in to_get[1:]:
write('Fetching results for id "', id, '"')
val = self.fetch_results(id=id, **kwargs)
val = val[0]
if allclose(ref, val, rtol=rtol, atol=atol):
res = self.fetch_results(id=id_ref)
label = self.get_global_option('label')
write('Convergence reached in Dataset "' +
label+'" for id "', id_ref, '"')
return (id_ref, res[0])
ref = val
id_ref = id
raise LookupError('Convergence not reached, enlarge the dataset'
' or change tolerance values')
[docs] def get_times(self):
"""
Return the TimeData from the time-*.yaml files in the run_dir
"""
from os import path as p
from futile import Time as T
time_files = {}
run_dir = self.get_global_option('run_dir')
time_files = [p.join(run_dir,
self.fetch_results(id=self.ids[c],
attribute='data_directory')[0],
'time-' + self.names[c] + '.yaml'
) for c in self.results
]
return T.TimeData(*time_files)
[docs] def wait(self):
"""!skip"""
from IPython.display import display, clear_output
from aiida.orm import load_node
running = len(self.results)
while(running != 0):
import time
time.sleep(1)
running = len(self.results)
for c in self.results:
pk = self.results[c]['node'].pk
node = load_node(pk)
if(node.is_finished):
running -= 1
# print(node.is_finished_ok)
clear_output(wait=True)
display(str(running)+" processes still running")
[docs] def get_logfiles(self):
"""
Attempt to obtain the logfiles for completed runs
Returns:
dict:
{runname: Logfile}
"""
logfiles = {}
for c in self.results:
try:
logfiles[c] = self.calculators[0]['calc'].get_logs(
self.results[c]['node'].pk, self.names[c])
except ValueError:
logfiles[c] = self.results[c]
print("no logfile for " + str(c))
return logfiles
[docs]def combine_datasets(*args):
"""
Define a new instance of the dataset class that should provide
as a result a list of the runs of the datasets
"""
full = Dataset(label='combined_dataset')
# append the runs or each dataset
for dt in args:
for irun, runs in enumerate(dt.runs):
calc = dt.get_runner(irun)
id, dt.get_id(irun)
full.append_run(id, calc, **runs)
full.set_postprocessing_function(_combined_postprocessing_functions)
def _combined_postprocessing_functions(runs, results, **kwargs):
pass